Job Logical Plan

An example of general logical plan

The above figure illustrates a general job logical plan, which takes 4 steps to get the final result:

  1. Create the initialRDDfrom any source (e.g., in-memory data structures, local file, HDFS, and HBase). Note thatcreateRDD()is equivalent toparallelize()mentioned in the previous chapter.

  2. A series of_transformation operations_onRDD, denoted astransformation(). Eachtransformation()produces one or multiple serialRDD[T]s, whereTcan be any type in Scala.

    If T is (K, V), K cannot be set to be collection type, such asArrayandList, since it is hard to definepartition()function on collections.

  3. Action operation, denoted asaction()is called on the finalRDD. Then, each partition generates computing result.

  4. These results will be sent to the driver, thenf(List[Result])will be performed to compute the final result. For example,count()takes two steps,action()andsum().

RDD can be cached in memory or persited on disk, by callingcache(),persist()orcheckpoint(). The number of partitions is usually set by user. Partition mapping between two RDDs can be 1:1 or M:N. In the picture above, we can see not only 1:1 mapping, but also M:N mapping.

Logical Plan

While writing Spark code, you might also have a logical plan (e.g., how many RDDs will be generated) in you mind (like the one above). However, in general, more RDDs will be generated at runtime.

In order to make this logical plan clear, we will answer the following questions from the view of Spark itself: Given a Spark program,

  • How to produce RDDs? What kinds of RDDs should be produced?
  • How to connect (i.e., build data dependency between) RDDs?

1. How to produce RDDs? What RDD should be produced?

Atransformation()usually produces a new RDD, but sometransformation()s can produce multiple RDDs because they have multiple processing steps or contain several sub-transformation(). That's why the number of RDDs is, in fact, more than we thought.

Logical plan is essentially acomputing chain. Every RDD has acompute()method, which reads input records (e.g., key/value pairs) from the previous RDD or data source, performstransformation(), and then outputs new records.

What RDDs should be produced depends on the computing logic of thetransformation(). Let's talk about some typicaltransformation()and their produced RDDs.

We can learn about the meaning of eachtransformation()on Spark site. More details are listed in the following table, whereiterator(split)denotesfor each record in a partition. There are some blanks in the table, because they are complextransformation()that produce multiple RDDs. They will be detailed soon after.

Transformation Generated RDDs Compute()
map(func) MappedRDD iterator(split).map(f)
filter(func) FilteredRDD iterator(split).filter(f)
flatMap(func) FlatMappedRDD iterator(split).flatMap(f)
mapPartitions(func) MapPartitionsRDD f(iterator(split))
mapPartitionsWithIndex(func) MapPartitionsRDD f(split.index, iterator(split))
sample(withReplacement, fraction, seed) PartitionwiseSampledRDD PoissonSampler.sample(iterator(split)) BernoulliSampler.sample(iterator(split))
pipe(command, [envVars]) PipedRDD
union(otherDataset)
intersection(otherDataset)
distinct([numTasks]))
groupByKey([numTasks])
reduceByKey(func, [numTasks])
sortByKey([ascending], [numTasks])
join(otherDataset, [numTasks])
cogroup(otherDataset, [numTasks])
cartesian(otherDataset)
coalesce(numPartitions)
repartition(numPartitions)

2. How to build data dependency between RDDs?

This question can be divided into three smaller questions:

  • A RDD (e.g., RDD x ) depends on one parent RDD or several parent RDDs?
  • How many partitions are there in RDD x ?
  • What's the relationship between the partitions of RDD x and those of its parent RDD(s)? One partition depends one or several partitions of the parent RDD?

The first question is trivial.x = rdda.transformation(rddb)means that RDDxdepends onrddaandrddb. For example,val x = a.join(b)means thatRDD xdepends bothRDD aandRDD b.

For the second question, as mentioned before, the number of partitions is defined by user, by default, it takesmax(partitionNum of parentRDD1, ..., partitionNum of parentRDDn).

The third question is a bit complex, we need to consider the semantics oftransformation(). Differenttransformation()s have different data dependencies. For example,map()is 1:1.groupByKey()produces aShuffledRDD, in which each partition depends on all partitions in its parent RDD. Some othertransformation()s can be more complex.

In Spark, there are two kinds of data partition dependencies between RDDs:

  • NarrowDependency (e.g., OneToOneDependency and RangeDependency)

    Each partition of the child RDDfullydepends on a small number of partitions of its parent RDD.Fully depends(i.e.,FullDependency) means that a child partition depends theentireparent partition.

  • ShuffleDependency (or Wide dependency mentioned in Matei's paper)

    Multiple child partitionspartiallydepends on a parent partition.Partially depends(i.e.,PartialDependency) means that each child partition dependsa part ofthe parent partition.

For example,map()leads to a narrow dependency, whilejoin()usually leads to to a wide dependencies.

Moreover, a child partition can depend on one partition in a parent RDD and one partition in another parent RDD.

Note that:

  • For NarrowDependency , whether a child partition depends one or multiple parent partitions is determined by the getParents(partition i) function in child RDD. (More details later)
  • ShuffleDependency is like shuffle dependency in MapReduce(the mapper partitions its outputs, then each reducer will fetch all the needed output partitions via http.fetch)

The two dependencies are illustrated in the following figure.

According to the definition, the first three cases areNarrowDependencyand the last one isShuffleDependency.

Need to mention that the left one on the second row is a rare case. It is a N:NNarrowDependency. Although it looks like ShuffleDependency, it is a full dependency. It can be created in some trickytransformation()s. We will not talk about this case, becauseNarrowDependencyessentially meanseach partition of the parent RDD is used by at most one partition of the child RDDin Spark source code.

In summary, data partition dependencies are listed as below

  • NarrowDependency (black arrow)
    • RangeDependency = > only used for UnionRDD
    • OneToOneDependency (1:1) = > e.g., map(), filter()
    • NarrowDependency (N:1) = > e.g., co-partitioned join()
    • NarrowDependency (N:N) = > a rare case
  • ShuffleDependency (red arrow)

Note that, in the rest of this chapter,NarrowDependencywill be drawn as black arrows andShuffleDependencyare red ones.

The classificaiton ofNarrowDependencyandShuffleDependencyis needed for generating physical plan, which will be detailed in the next chapter.

3. How to compute records in the RDD?

AnOneToOneDependencycase is shown in the following figure. Although the data partition relationship is 1:1, it doesn't mean that the records in each partition should be read, computed, and outputted one by one.

The difference between the two patterns on the right side is similar to the following code snippets.

Code of iter.f():

int
[] array 
=
 {
1
, 
2
, 
3
, 
4
, 
5
}

for
(
int
 i 
=
0
; i 
<
 array
.
length; i
++
)
    output f(array[i])

Code of f(iter):

int
[] array 
=
 {
1
, 
2
, 
3
, 
4
, 
5
}
output f(array)

4. Illustration of typical data partition dependencies

1) union(otherRDD)

union()simply combines two RDDs together. It never changes the data of a partition.RangeDependency(1:1) retains the borders of original RDDs in order to make it easy to revisit the original partitions.

2) groupByKey(numPartitions)[changed in 1.3]

We have talked aboutgroupByKey's dependency before, now we make it more clear.

groupByKey()aggregates records with the same key by shuffle. Thecompute()function inShuffledRDDfetches necessary data for its partitions, then performsmapPartition()operation in aOneToOneDependencystyle. Finally,ArrayBuffertype in the value is casted toIterable.

groupByKey()has no map-side combine, because map-side combine does not reduce the amount of data shuffled and requires all map-side data be inserted into a hash table, leading to too many objects in the Old Gen.

ArrayBufferis essentiallya CompactBuffer, which is an append-only buffer similar to ArrayBuffer, but more memory-efficient for small buffers.

2) reduceyByKey(func, numPartitions)[changed in 1.3]

reduceByKey()is similar toreduce()in MapReduce. The data flow is equivalent.redcuceByKeyenables map-side combine by default, which is carried out bymapPartitions()before shuffle and results inMapPartitionsRDD. After shuffle,aggregate + mapPartitions()is applied toShuffledRDD. Again, we get aMapPartitionsRDD.

3) distinct(numPartitions)

distinct()aims to deduplicate RDD records. Since duplicated records can be found in different partitions, shuffle + aggregation is needed to deduplicate the records. However, shuffle requires that the type of RDD isRDD[(K,V)]. If the original records have only keys (e.g.,RDD[Int]), it should be completed as<K,null>through performing amap()(results in aMappedRDD). After that,reduceByKey()is used to do some shuffle (mapSideCombine => reduce => MapPartitionsRDD). Finally, only key is taken from<K,null>bymap()(MappedRDD). The blue RDDs are exactly the RDDs inreduceByKey().

4) cogroup(otherRDD, numPartitions)

Different fromgroupByKey(),cogroup()aggregates 2 or more RDDs. Here is a question:Should the partition relationship between (RDD a, RDD b) and CoGroupedRDD be ShuffleDependency or OneToOneDependency?This question is bit complex and related to the following two items.

  • Number of partition

    The # of partition inCoGroupedRDDis defined by user, and it has nothing to do withRDD aandRDD b. However, if #partition ofCoGroupedRDDis different from that ofRDD aorRDD b, the partition dependency cannot be anOneToOneDependency.

  • Type of the partitioner

    Thepartitionerdefined by user (HashPartitionerby default) determines how to partition the data. IfRDD a,RDD b, andCoGroupedRDDhave the same # of partition but different partitioners, the partition dependency cannot beOneToOneDependency. Let's take the last case in the above figure as an example,RDD aisRangePartitioner,RDD bisHashPartitioner, andCoGroupedRDDisRangePartitionerwith the same # partition asRDD a. Obviously, the records in each partition ofRDD acan be directly sent to the corresponding partition inCoGroupedRDD, but those inRDD bneed to be divided in order to be shuffled into the right partitions ofCoGroupedRDD.

To conclude,OneToOneDependencyoccurs if the partitioner type and #partitions of the parent RDDs andCoGroupedRDDare the same. Otherwise, the dependency must be aShuffleDependency. More details can be found inCoGroupedRDD.getDependencies()'s source code.

How does Spark keep multiple partition dependencies for each partition inCoGroupedRDD?

Firstly,CoGroupedRDDput all the parentRDDs intordds: Array[RDD]

Then,

Foreach rdd = rdds(i):
    if the dependency between CoGroupedRDD and rdd is OneToOneDependency
        Dependecy[i] = new OneToOneDependency(rdd)
    else
        Dependecy[i] = new ShuffleDependency(rdd)

Finally, it returnsdeps: Array[Dependency], which is an array ofDependencycorresponding to each parent RDD.

Dependency.getParents(partition id)returnspartitions: List[Int], which are the parent partitions of the partition (id) with respect to the givenDependency.

getPartitions()tells how many partitions exist in aRDDand how each partition is serialized.

5) intersection(otherRDD)

intersection()aims to extract all the common elements fromRDD aandRDD b.RDD[T]is mapped intoRDD[(T,null)], whereTcannot be any collections. Then,a.cogroup(b)(colored in blue) is performed. Next,filter()only keeps the records where neither of[iter(groupA()), iter(groupB())]is empty (FilteredRDD). Finally, only keys of the reocrds are kept inMappedRDD.

6)join(otherRDD, numPartitions)

join()takes twoRDD[(K,V)], likejoinin SQL. Similar tointersection(), it doescogroup()first and results in aMappedValuesRDDwhose type isRDD[(K, (Iterable[V1],Iterable[V2]))]. Then, it computes the Cartesian product between the twoIterable, and finallyflatMap()is performed.

Here are two examples, in the first one,RDD 1andRDD 2useRangePartitioner, whileCoGroupedRDDusesHashPartitioner, so the partition dependency isShuffleDependency. In the second one,RDD 1is previously partitioned on key byHashPartitionerand gets 3 partitions. SinceCoGroupedRDDalso usesHashPartitionerand generates 3 partitions, their depedency isOneToOneDependency. Furthermore, ifRDD 2is also previously divided byHashPartitioner(3), all the dependencies will beOneToOneDependency. This kind ofjoinis calledhashjoin().

7) sortByKey(ascending, numPartitions)

sortByKey()sorts records ofRDD[(K,V)]by key.ascendingis a self-explanatory boolean flag. It produces aShuffledRDDwhich takes arangePartitioner. The partitioner decides the border of each partition. For example, the first partition takes records with keys fromchar Atochar B, and the second takes those fromchar Ctochar D. Inside each partition, records are sorted by key. Finally, the records inMapPartitionsRDDare in order.

sortByKey()usesArrayto store the records of each partition, then sorts them.

8) cartesian(otherRDD)

Cartesian()returns a Cartesian product of twoRDDs. The resultingRDDhas#partition(RDD a) * #partition(RDD b)partitions.

Need to pay attention to the dependency, each partition inCartesianRDDdepends 2entireparent RDDs. They are allNarrowDependency.

CartesianRDD.getDependencies()returnsrdds: Array(RDD a, RDD b). The i-th partition ofCartesianRDDdepends:

  • a.partitions(i / #partitionA)
  • b.partitions(i % #partitionB)

9) coalesce(numPartitions, shuffle = false)

coalesce()can reorganize partitions, e.g. decrease # of partitions from 5 to 3, or increase from 5 to 10. Need to notice that whenshuffle = false, we cannot increase partitions, because that will force a shuffle.

To understandcoalesce(), we need to knowthe relationship betweenCoalescedRDD's partitions and its parent partitions

  • coalesce(shuffle = false)Since shuffle is disabled, what we can do is just to group certain parent partitions. In fact, to achieve a_good_group, there are many factors to take into consideration, e.g. # records in partition, locality, balance, etc. Spark has a rather complicated algorithm to do with that (we will not talk about that for the moment). For example,a.coalesce(3, shuffle = false)is essentially aNarrowDependencyof N:1.

  • coalesce(shuffle = true)When shuffle is enabled,coalescesimply divides all records ofRDDinto N partitions, which can be done by the following tricky method (like round-robin algorithm):

    • for each partition, every record is assigned a key which is an increasing number.
    • hash(key) leads to a uniform records distribution on all different partitions.

    In the second example, every record inRDD ais combined with a increasing key (on the left side of the pair). The key of the first record in each partition is equal to(new Random(index)).nextInt(numPartitions), whereindexis the index of the partition andnumPartitionsis the # of partitions inCoalescedRDD. The following keys increase by 1. After shuffle, the records inShffledRDDare uniformly distributed. The relationship betweenShuffledRDDandCoalescedRDDis defined a complicated algorithm. In the end, keys are removed (MappedRDD).

10) repartition(numPartitions)

Equivalent to coalesce(numPartitions, shuffle = true)

The primitive transformation()

combineByKey()

So far, we have analyzed a lot of logic plans. It's true that some of them are very similar. The reason is that they have the same shuffle+aggregate behavior:

The RDD on left side ofShuffleDependencyisRDD[(K,V)], while, on the right side, all records with the same key are aggregated, then different operations will be applied on these aggregated records.

In fact, manytransformation(), likegroupByKey(),reduceBykey(), executesaggregate()while doing logical computation. Sothe similarity is thataggregate()andcompute()are executed in the same time.Spark usescombineByKey()to implementaggregate() + compute()operation.

Here is the definition ofcombineByKey()

def
combineByKey
[
C
](
createCombiner
: 
V
=
>
C
,

mergeValue
: (
C
, 
V
) 
=
>
C
,

mergeCombiners
: (
C
, 
C
) 
=
>
C
,

partitioner
: 
Partitioner
,

mapSideCombine
: 
Boolean
=
true
,

serializer
: 
Serializer
=
null
)
:
RDD
[(
K
, 
C
)]

There are three important parameters:

  • createCombiner , which turns a V into a C (e.g., creates an one-element list)
  • mergeValue , to merge a V into a C (e.g., adds an element to the end of the list)
  • mergeCombiners , to combine two C's into a single one (e.g., merge two lists into a new one).

Details:

  • When some (K, V) records are being pushed to combineByKey() , createCombiner takes the first record to initialize a combiner of type C (e.g., C = V).
  • From then on, mergeValue takes every incoming record, mergeValue(combiner, record.value) , to update the combiner. Let's take sum as an example, combiner = combiner + recode.value . In the end, all concerned records are merged into the combiner
  • If there is another set of records with the same key as the pairs above. combineByKey() will produce another combiner' . In the last step, the final result is equal to mergeCombiners(combiner, combiner') .

Conclusion

So far, we have discussed how to produce job's logical plan as well as the complex partition dependency and computation behind Spark.

tranformation()decides what kind of RDDs will be produced. Sometransformation()are reused by other operations (e.g.,cogroup).

The dependency of aRDDdepends on the semantics oftransformation(). For example,CoGroupdRDDdepends on allRDDs used forcogroup().

The relationship ofRDDpartitions areNarrowDependencyandShuffleDependency. The former isfull dependencyand the latter ispartial dependency.NarrowDependencycan be represented in many cases. A dependency is aNarrowDependency, if the RDDs' #partition and partitioner type are the same.

In terms of dataflow,MapReduceis equivalent tomap() + reduceByKey(). Technically, thereduce()ofMapReducewould be more powerful thanreduceByKey(). The design and implementation of shuffle will be detailed in chapterShuffle details.

results matching ""

    No results matching ""